package com.uxsino.reactorq.commons;

import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Manage Activemq connections and create queue, topic, flux, subscriber
 * 
 *
 *
 */
public class ReactorQFactory {
    private static Logger logger = LoggerFactory.getLogger(ReactorQFactory.class);

    private TopicConnection topicConnection = null;

    private QueueConnection queueConnection = null;

    private String userName;

    private String password;

    private String brokerURL;

    public ReactorQFactory(String userName, String password, String brokerURL) {
        this.userName = userName;
        this.password = password;
        this.brokerURL = brokerURL;
    }

    public TopicConnection getTopicConnection() throws JMSException {
        if (topicConnection == null) {
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
            // ((ActiveMQConnectionFactory) factory).setWatchTopicAdvisories(false);
            topicConnection = factory.createTopicConnection();
            topicConnection.start();
        }
        return topicConnection;
    }

    public QueueConnection getQueueConnection() throws JMSException {
        if (queueConnection == null) {
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
            // ((ActiveMQConnectionFactory) factory).setWatchTopicAdvisories(false);
            queueConnection = factory.createQueueConnection();
            queueConnection.start();
        }
        return queueConnection;
    }

    public <T> QueueSubscriber<T> createQueueSubscriber(String queueName, Class<T> cls) throws JMSException {
        return new QueueSubscriber<T>(getQueueConnection(), queueName, cls);
    }
    
    public <T> QueueSubscriber<T> createQueueSubscriber(String queueName, Class<T> cls, boolean acknowledge) throws JMSException {
        return new QueueSubscriber<T>(getQueueConnection(), queueName, cls, acknowledge);
    }

    public <T> QueueSubscriber<T> createQueueSubscriber(javax.jms.Queue queue, Class<T> cls) throws JMSException {
        return new QueueSubscriber<T>(getQueueConnection(), queue, cls);
    }

    public <T> TopicSubscriber<T> createTopicSubscriber(String queueName, Class<T> cls) throws JMSException {
        return new TopicSubscriber<T>(getTopicConnection(), queueName, cls);
    }

    public <T> TopicSubscriber<T> createTopicSubscriber(javax.jms.Topic queue, Class<T> cls) throws JMSException {
        return new TopicSubscriber<T>(getTopicConnection(), queue, cls);
    }

    public String getBrokerURL() {
        return brokerURL;
    }

    public <T> QueueFlux<T> createQueueFlux(String queueName, Class<T> cls, boolean acknowledge) throws JMSException {
        return new QueueFlux<T>(getQueueConnection(), queueName, cls, acknowledge);
    }

    public <T> QueueFlux<T> createQueueFlux(String queueName, Class<T> cls) throws JMSException {
        return new QueueFlux<T>(getQueueConnection(), queueName, cls);
    }

    public <T> QueueFlux<T> createQueueFlux(String queueName, Class<T> cls, String receiverId, boolean acknowledge)
        throws JMSException {
        return new QueueFlux<T>(getQueueConnection(), queueName, cls, receiverId, acknowledge);
    }

    public <T> TopicFlux<T> createTopicFlux(String topicName, Class<? extends T> cls) throws JMSException {
        return createTopicFlux(topicName, cls, null);
    }

    public <T> TopicFlux<T> createTopicFlux(String topicName, Class<? extends T> cls, String receiverId)
        throws JMSException {
        return new TopicFlux<T>(getTopicConnection(), topicName, cls, receiverId);
    }

    // static members

    private static ReactorQFactory defaultFactory = null;

    public static ReactorQFactory setDefaultFactory(String userName, String password, String brokerURL) {
        logger.info("setting default activemq broker to {}", brokerURL);
        defaultFactory = new ReactorQFactory(userName, password, brokerURL);
        return defaultFactory;
    }

    public static ReactorQFactory getDefaultFactory() {
        return defaultFactory;
    }

}
